/*
 * Copyright 2013 Pavel Stastny <pavel.stastny at gmail.com>.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.aplikator.server.processes.impl;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.aplikator.server.processes.Process;
import org.aplikator.server.processes.ProcessManager;
import org.aplikator.server.processes.impl.processes.ipc.IPCChanelFactory;
import org.aplikator.server.processes.impl.processes.ipc.IPCChannel;
import org.aplikator.server.processes.impl.processes.ipc.IPCMessage;
import org.json.JSONException;
import org.json.JSONObject;

/**
 *
 * @author Pavel Stastny <pavel.stastny at gmail.com>
 */
public class ProcessManagerImpl extends ProcessManager {
    
    public static final Logger LOGGER = Logger.getLogger(Logger.class.getName());
    
    private static final int SCHEDULING_INTERVAL = 2000;
    private static final int POLL_INTERVAL = 200;
    
    private Map<String, Process> registeredProcesses = new HashMap<String, Process>();

    private ScheduledExecutorService executorService;
    private ScheduledExecutorService pollProcessBusService;
    
    // scheduled processes
    private List<ProcessScheduling> scheduled = new ArrayList<ProcessScheduling>();
    
    boolean started = false;
    
    final Object lock = new Object();
    
    public ProcessManagerImpl(String appname) {
        
        LOGGER.info("initializing manager '"+appname+"'");
        
        this.executorService = Executors.newScheduledThreadPool(1);
        this.executorService.schedule(new ProcessCheck(), SCHEDULING_INTERVAL, TimeUnit.MILLISECONDS);
        
        this.pollProcessBusService = Executors.newScheduledThreadPool(1);
        this.pollProcessBusService.schedule(new ProcessMessagePoll(appname), POLL_INTERVAL, TimeUnit.MILLISECONDS);
    }

    
    @Override
    public List<Process> getProcesses() {
        Set<String> keySet = this.registeredProcesses.keySet();
        List<Process> processes = new ArrayList<Process>();
        for (String key : keySet) {
            processes.add(this.registeredProcesses.get(key));
        }
        return processes;
    }

    
    
    @Override
    public void registerProcess(Process process) {
        this.registeredProcesses.put(process.getIdentifier(), process);
    }

    @Override
    public void deregisterProcess(Process process) {
        this.registeredProcesses.remove(process.getIdentifier());
    }


    @Override
    public  void startScheduling() {
        synchronized(lock) {
            if (!this.started) {
               executorService.schedule(new ProcessCheck(), SCHEDULING_INTERVAL, TimeUnit.MILLISECONDS);
               this.started = true;
            }
        }
    }

    @Override
    public void stopScheduling() {
        synchronized(lock) {
            if (!this.executorService.isShutdown() && (!this.executorService.isTerminated())) {
                this.executorService.shutdown();
            }
        }
    }

    @Override
    public void shutDown() {
        this.executorService.shutdown();
        this.pollProcessBusService.shutdown();
    }

    
    
    @Override
    public void schedule(Process process, Date schedulingTime) {
        synchronized(lock) {
            ProcessScheduling psch = new ProcessScheduling(process, schedulingTime);
            this.scheduled.add(psch);
            Collections.sort(this.scheduled);
        }
    }

    @Override
    public Process lookUpProcess(String ident) {
        return this.registeredProcesses.get(ident);
    }
    

    
    
    class ProcessMessagePoll implements Runnable {

        private String appname;

        public ProcessMessagePoll(String appname) {
            this.appname = appname;
        }
        
        
        @Override
        public void run() {
            try {
                IPCChannel channel = IPCChanelFactory.createFactory().createChannel(this.appname);
                IPCMessage mess = channel.receiveMessage("1");
                if (mess != null) {
                    processMessage(mess);
                }
                
                pollProcessBusService.schedule(new ProcessMessagePoll(appname), POLL_INTERVAL, TimeUnit.MILLISECONDS);
            } catch (ClassNotFoundException ex) {
                Logger.getLogger(ProcessManagerImpl.class.getName()).log(Level.SEVERE, null, ex);
            } catch (InstantiationException ex) {
                Logger.getLogger(ProcessManagerImpl.class.getName()).log(Level.SEVERE, null, ex);
            } catch (IllegalAccessException ex) {
                Logger.getLogger(ProcessManagerImpl.class.getName()).log(Level.SEVERE, null, ex);
            } catch (IOException ex) {
                Logger.getLogger(ProcessManagerImpl.class.getName()).log(Level.SEVERE, null, ex);
            } catch (JSONException ex) {
                Logger.getLogger(ProcessManagerImpl.class.getName()).log(Level.SEVERE, null, ex);
            }
        }

        private void processMessage(IPCMessage mess) throws JSONException {
            System.out.println("processing message :"+mess);
            String body = mess.getBody();
            JSONObject jsonObj = new JSONObject(body);
            String address = mess.getSourceAddress();
            Process process = registeredProcesses.get(address);
            if (process != null) {
                process.updateProcessIntrnalState(jsonObj);
            }
        }
    }
    

    
    
    class ProcessCheck implements Runnable {

        @Override
        public void run() {
            LOGGER.info("scheduling cycle");
            ProcessScheduling top = scheduled.get(0);
            Date d = new Date();
            if (d.after(top.getSchedulingTime())) {
                Process execProcess = scheduled.remove(0).getProcess();
                try {
                    execProcess.startMe();
                }catch(Throwable e) {
                    LOGGER.log(Level.SEVERE, e.getMessage(), e);
                }
                //scheduling next run
                executorService.schedule(new ProcessCheck(), SCHEDULING_INTERVAL, TimeUnit.MILLISECONDS);
            }
        }
    }
    
    class ProcessScheduling implements Comparable<ProcessScheduling>{
        
        private Process process;
        private Date schedulingTime;

        public ProcessScheduling(Process process, Date schedulingTime) {
            this.process = process;
            this.schedulingTime = schedulingTime;
        }

        public Date getSchedulingTime() {
            return schedulingTime;
        }

        public Process getProcess() {
            return process;
        }

        @Override
        public int compareTo(ProcessScheduling o) {
            return this.schedulingTime.compareTo(o.getSchedulingTime());
        }
    }
}

